-
Notifications
You must be signed in to change notification settings - Fork 356
Fix UUID support #2007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix UUID support #2007
Conversation
Following issue #1986 , I was about to make a smaller PR without the knowledge of the extra spark-related complications. In case it's useful, the only extra thing I had that you haven't (yet) added is a small unit test in tests/io/test_pyarrow_visitor.py at roughly line 235:
|
While fixing some issues on the PyIceberg ends to fully support UUIDs: apache/iceberg-python#2007 I noticed this issue, and was suprised since UUID used to work with Spark, but it turns out that the dictionary encoded UUID was not implemented yet. For PyIceberg we only generate little data, so therefore this wasn't caught previously.
While fixing some issues on the PyIceberg ends to fully support UUIDs: apache/iceberg-python#2007 I noticed this issue, and was suprised since UUID used to work with Spark, but it turns out that the dictionary encoded UUID was not implemented yet. For PyIceberg we only generate little data, so therefore this wasn't caught previously.
While fixing some issues on the PyIceberg ends to fully support UUIDs: apache/iceberg-python#2007 I noticed this issue, and was suprised since UUID used to work with Spark, but it turns out that the dictionary encoded UUID was not implemented yet. For PyIceberg we only generate little data, so therefore this wasn't caught previously.
While fixing some issues on the PyIceberg ends to fully support UUIDs: apache/iceberg-python#2007 I noticed this issue, and was suprised since UUID used to work with Spark, but it turns out that the dictionary encoded UUID was not implemented yet. For PyIceberg we only generate little data, so therefore this wasn't caught previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Thanks @kevinjqliu |
* Spark: Support Parquet dictionary encoded UUIDs While fixing some issues on the PyIceberg ends to fully support UUIDs: apache/iceberg-python#2007 I noticed this issue, and was suprised since UUID used to work with Spark, but it turns out that the dictionary encoded UUID was not implemented yet. For PyIceberg we only generate little data, so therefore this wasn't caught previously. * Add another test
# Rationale for this change The UUID support is a gift that keeps on giving. The current support of PyIceberg is incomplete, and problematic. Mostly because: - It is an extension-type in Arrow, which means it is not fully supported: apache/arrow#46469 apache/arrow#46468 - It doesn't have native support in Spark, where it is converted into a string. This limits the current tests, which are mostly Spark-based. I think we have to wait for some fixes in Arrow upstream until we can fully support this. In PyIceberg, we're converting the `fixed[16]` to a `UUID`, but Spark does seem to error because the logical type annotation in Parquet is missing: ``` E py4j.protocol.Py4JJavaError: An error occurred while calling o72.collectToPython. E : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (localhost executor driver): java.lang.UnsupportedOperationException: Unsupported type: UTF8String E at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:81) E at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:143) E at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) E at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) E at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) E at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) E at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893) E at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893) E at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) E at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) E at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) E at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) E at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) E at org.apache.spark.scheduler.Task.run(Task.scala:141) E at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) E at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) E at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) E at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) E at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) E at java.base/java.lang.Thread.run(Thread.java:829) E E Driver stacktrace: E at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856) E at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792) E at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791) E at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) E at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) E at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) E at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791) E at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247) E at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247) E at scala.Option.foreach(Option.scala:407) E at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247) E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060) E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994) E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983) E at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) E at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989) E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393) E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414) E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433) E at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458) E at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049) E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) E at org.apache.spark.rdd.RDD.withScope(RDD.scala:410) E at org.apache.spark.rdd.RDD.collect(RDD.scala:1048) E at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448) E at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4149) E at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323) E at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546) E at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321) E at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125) E at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201) E at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108) E at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) E at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) E at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321) E at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4146) E at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at java.base/java.lang.reflect.Method.invoke(Method.java:566) E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) E at py4j.Gateway.invoke(Gateway.java:282) E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) E at py4j.commands.CallCommand.execute(CallCommand.java:79) E at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) E at py4j.ClientServerConnection.run(ClientServerConnection.java:106) E at java.base/java.lang.Thread.run(Thread.java:829) E Caused by: java.lang.UnsupportedOperationException: Unsupported type: UTF8String E at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:81) E at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:143) E at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) E at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) E at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) E at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) E at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893) E at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893) E at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) E at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) E at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) E at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) E at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) E at org.apache.spark.scheduler.Task.run(Task.scala:141) E at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) E at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) E at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) E at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) E at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) E ... 1 more ``` # Are these changes tested? # Are there any user-facing changes? Closes apache#1986 Closes apache#2002 <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: DinGo4DEV <[email protected]>
Rationale for this change
The UUID support is a gift that keeps on giving. The current support of PyIceberg is incomplete, and problematic. Mostly because:
I think we have to wait for some fixes in Arrow upstream until we can fully support this. In PyIceberg, we're converting the
fixed[16]
to aUUID
, but Spark does seem to error because the logical type annotation in Parquet is missing:Are these changes tested?
Are there any user-facing changes?
Closes #1986
Closes #2002